package com.wjk.kylin.lock.executor.zookeeper;

import com.wjk.kylin.lock.enums.LockType;
import com.wjk.kylin.lock.exception.LockException;
import com.wjk.kylin.lock.executor.AbstractLockExecutor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * 支持读写锁、联锁、重入锁、不可重入锁
 * 分布式锁zookeeper处理器
 *
 * @author wangjinkui
 */
public class ZookeeperLockExecutor extends AbstractLockExecutor<InterProcessLock> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperLockExecutor.class);
    //用于支持重入锁
    private static final ThreadLocal<Map<String, InterProcessLock>> LOCK_DATA_THREAD_LOCAL = new ThreadLocal<>();

    private final CuratorFramework curatorFramework;

    public ZookeeperLockExecutor(CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework;
    }

    /**
     * 加锁
     *
     * @param lockKey        锁标识
     * @param expire         锁有效时间
     * @param acquireTimeout 获取锁超时时间
     * @param lockType       锁类型
     * @param keySuffix      锁key后缀集合
     * @return 锁实例。 如果为空，上层调用则不会调用解锁
     */
    @Override
    public InterProcessLock acquire(String lockKey, long expire, long acquireTimeout, LockType lockType, String[] keySuffix) {
        if (!CuratorFrameworkState.STARTED.equals(curatorFramework.getState())) {
            LOGGER.warn("instance must be started before calling this method");
            return null;
        }
        //加锁实例
        InterProcessLock lockInstance = null;
        //加锁成功标识
        boolean locked = false;
        try {
            lockInstance = getLockInstance(lockKey, lockType, keySuffix);

            if (lockInstance == null) {
                throw new LockException("lock instance is null");
            }

            locked = lockInstance.acquire(acquireTimeout, TimeUnit.MILLISECONDS);
            return obtainLockInstance(locked, lockInstance);
        } catch (LockException e) {
            throw e;
        } catch (Exception e) {
            if (locked) {
                this.releaseLock(lockInstance, lockKey);
            }
            LOGGER.error("zookeeper lock acquire error", e);
            return null;
        } finally {
            if (!locked && lockInstance instanceof InterProcessMutex) {
                removeReentrantLockData(lockKey);
            }
        }
    }

    /**
     * 释放锁
     *
     * @param lockInstance 锁实例
     * @param lockKey      锁标识
     * @return 是否释放锁成功
     */
    @Override
    public boolean releaseLock(InterProcessLock lockInstance, String lockKey) {
        try {
            if (lockInstance instanceof InterProcessMutex) {
                InterProcessMutex interProcessMutex = (InterProcessMutex) lockInstance;
                if (interProcessMutex.isOwnedByCurrentThread()) {
                    interProcessMutex.release();
                }
            } else {
                if (lockInstance.isAcquiredInThisProcess()) {
                    lockInstance.release();
                }
            }
        } catch (Exception e) {
            LOGGER.error("zookeeper lock release error", e);
            return false;
        } finally {
            if (lockInstance instanceof InterProcessMutex) {
                //移除重入锁
                removeReentrantLockData(lockKey);
            }
        }
        return true;
    }


    /**
     * 获取加锁实例
     *
     * @param lockKey   加锁key
     * @param lockType  {@link LockType} 加锁类型 ： 重入锁、读写锁、联锁
     * @param keySuffix 锁后缀集合
     * @return 锁实例
     */
    @Override
    public InterProcessLock getLockInstance(String lockKey, LockType lockType, String[] keySuffix) {
        //以”/“ 开始
        String nodePath = lockKey.startsWith("/") ? lockKey : "/" + lockKey;

        InterProcessLock lockInstance = null;

        switch (lockType) {
            case REENTRANT:
                lockInstance = this.getReentrantLockData(nodePath);
                break;
            case READ:
                lockInstance = new InterProcessReadWriteLock(curatorFramework, nodePath).readLock();
                break;
            case WRITE:
                lockInstance = new InterProcessReadWriteLock(curatorFramework, nodePath).writeLock();
                break;
            case MULTI:
                lockInstance = this.getMultiLock(nodePath, keySuffix);
                break;
            case SEMAPHORE:
                lockInstance = new InterProcessSemaphoreMutex(curatorFramework, nodePath);
                break;
            default:
                LOGGER.error("lockType is not support ,lockType:{}", lockType);
        }
        return lockInstance;
    }

    /**
     * 移除重入锁
     *
     * @param lockKey 锁标识
     */
    private void removeReentrantLockData(String lockKey) {
        try {
            String nodePath = lockKey.startsWith("/") ? lockKey : "/" + lockKey;

            Map<String, InterProcessLock> lockInstanceMap = LOCK_DATA_THREAD_LOCAL.get();
            if (lockInstanceMap != null) {
                InterProcessLock lockInstance = lockInstanceMap.get(nodePath);
                if (lockInstance != null && !lockInstance.isAcquiredInThisProcess()) {
                    lockInstanceMap.remove(nodePath);
                }
                if (lockInstanceMap.size() == 0) {
                    LOCK_DATA_THREAD_LOCAL.remove();
                }
            }
        } catch (Exception e) {
            LOGGER.error("remove reentrant lock map error. lockKey:{}", lockKey, e);
        }
    }

    /**
     * 重入锁对象
     *
     * @param nodePath 锁key ，全路径
     * @return 重入锁对象
     */
    private InterProcessLock getReentrantLockData(String nodePath) {
        InterProcessLock lockInstance;
        Map<String, InterProcessLock> lockInstanceMap = LOCK_DATA_THREAD_LOCAL.get();
        if (lockInstanceMap == null) {
            lockInstance = new InterProcessMutex(curatorFramework, nodePath);
            lockInstanceMap = new HashMap<>();
            lockInstanceMap.put(nodePath, lockInstance);
            LOCK_DATA_THREAD_LOCAL.set(lockInstanceMap);
        } else {
            lockInstance = lockInstanceMap.get(nodePath);
            if (lockInstance == null) {
                lockInstance = new InterProcessMutex(curatorFramework, nodePath);
                lockInstanceMap.put(nodePath, lockInstance);
            }
        }
        return lockInstance;
    }

    /**
     * 组织联锁
     *
     * @param nodePath
     * @param keySuffix
     * @return
     */
    private InterProcessLock getMultiLock(String nodePath, String[] keySuffix) {
        keySuffix = super.defaultKeySuffix(keySuffix);

        List<InterProcessLock> list = new ArrayList<>(keySuffix.length);
        for (String suffix : keySuffix) {
            StringBuilder sb = new StringBuilder(nodePath);
            if (StringUtils.hasText(suffix)) {
                sb.append("/").append(suffix);
            }
            list.add(new InterProcessMutex(curatorFramework, sb.toString()));
        }

        return new InterProcessMultiLock(list);
    }
}